Merge pull request #627 from dsander/deduplication-agent

Added DeDuplicationAgent

Dominik Sander 10 lat temu
rodzic
commit
fc555758a0

+ 87 - 0
app/models/agents/de_duplication_agent.rb

@@ -0,0 +1,87 @@
1
+module Agents
2
+  class DeDuplicationAgent < Agent
3
+    include FormConfigurable
4
+    cannot_be_scheduled!
5
+
6
+    description <<-MD
7
+      The DeDuplicationAgent receives a stream of events and remits the event if it is not a duplicate.
8
+
9
+      `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload)
10
+
11
+      `lookback` amount of past Events to compare the value to (0 for unlimited)
12
+
13
+      `expected_update_period_in_days` is used to determine if the Agent is working.
14
+    MD
15
+
16
+    event_description <<-MD
17
+      The DeDuplicationAgent just reemits events it received.
18
+    MD
19
+
20
+    def default_options
21
+      {
22
+        'property' => '{{value}}',
23
+        'lookback' => 100,
24
+        'expected_update_period_in_days' => 1
25
+      }
26
+    end
27
+
28
+    form_configurable :property
29
+    form_configurable :lookback
30
+    form_configurable :expected_update_period_in_days
31
+
32
+    before_create :initialize_memory
33
+
34
+    def initialize_memory
35
+      memory['properties'] = []
36
+    end
37
+
38
+    def validate_options
39
+      unless options['lookback'].present? && options['expected_update_period_in_days'].present?
40
+        errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.")
41
+      end
42
+    end
43
+
44
+    def working?
45
+      event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
46
+    end
47
+
48
+    def receive(incoming_events)
49
+      incoming_events.each do |event|
50
+        handle(interpolated(event), event)
51
+      end
52
+    end
53
+
54
+    private
55
+
56
+    def handle(opts, event = nil)
57
+      property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property'])
58
+      if is_unique?(property)
59
+        created_event = create_event :payload => event.payload
60
+
61
+        log("Propagating new event as '#{property}' is a new unique property.", :inbound_event => event )
62
+        update_memory(property, opts['lookback'].to_i)
63
+      else
64
+        log("Not propagating as incoming event is a duplicate.", :inbound_event => event )
65
+      end
66
+    end
67
+
68
+    def get_hash(property)
69
+      if property.to_s.length > 10
70
+        Zlib::crc32(property).to_s
71
+      else
72
+        property
73
+      end
74
+    end
75
+
76
+    def is_unique?(property)
77
+      !memory['properties'].include?(property)
78
+    end
79
+
80
+    def update_memory(property, amount)
81
+      if amount != 0 && memory['properties'].length == amount
82
+        memory['properties'].shift
83
+      end
84
+      memory['properties'].push(property)
85
+    end
86
+  end
87
+end

+ 127 - 0
spec/models/agents/de_duplication_agent_spec.rb

@@ -0,0 +1,127 @@
1
+require 'spec_helper'
2
+
3
+describe Agents::DeDuplicationAgent do
4
+  def create_event(output=nil)
5
+    event = Event.new
6
+    event.agent = agents(:jane_weather_agent)
7
+    event.payload = {
8
+      :output => output
9
+    }
10
+    event.save!
11
+
12
+    event
13
+  end
14
+
15
+  before do
16
+    @valid_params = {
17
+      :property  => "{{output}}",
18
+      :lookback => 3,
19
+      :expected_update_period_in_days => "1",
20
+    }
21
+
22
+    @checker = Agents::DeDuplicationAgent.new(:name => "somename", :options => @valid_params)
23
+    @checker.user = users(:jane)
24
+    @checker.save!
25
+  end
26
+
27
+  describe "validation" do
28
+    before do
29
+      expect(@checker).to be_valid
30
+    end
31
+
32
+    it "should validate presence of lookback" do
33
+      @checker.options[:lookback] = nil
34
+      expect(@checker).not_to be_valid
35
+    end
36
+
37
+    it "should validate presence of property" do
38
+      @checker.options[:expected_update_period_in_days] = nil
39
+      expect(@checker).not_to be_valid
40
+    end
41
+  end
42
+
43
+  describe "#working?" do
44
+    before :each do
45
+      # Need to create an event otherwise event_created_within? returns nil
46
+      event = create_event
47
+      @checker.receive([event])
48
+    end
49
+
50
+    it "is when event created within :expected_update_period_in_days" do
51
+      @checker.options[:expected_update_period_in_days] = 2
52
+      expect(@checker).to be_working
53
+    end
54
+
55
+    it "isnt when event created outside :expected_update_period_in_days" do
56
+      @checker.options[:expected_update_period_in_days] = 2
57
+
58
+      time_travel_to 2.days.from_now do
59
+          expect(@checker).not_to be_working
60
+      end
61
+    end
62
+  end
63
+
64
+  describe "#receive" do
65
+    before :each do
66
+      @event = create_event("2014-07-01")
67
+    end
68
+
69
+    it "creates events when memory is empty" do
70
+      @event.payload[:output] = "2014-07-01"
71
+      expect {
72
+        @checker.receive([@event])
73
+      }.to change(Event, :count).by(1)
74
+      expect(Event.last.payload[:command]).to eq(@event.payload[:command])
75
+      expect(Event.last.payload[:output]).to eq(@event.payload[:output])
76
+    end
77
+
78
+    it "creates events when new event is unique" do
79
+      @event.payload[:output] = "2014-07-01"
80
+      @checker.receive([@event])
81
+
82
+      event = create_event("2014-08-01")
83
+
84
+      expect {
85
+        @checker.receive([event])
86
+      }.to change(Event, :count).by(1)
87
+    end
88
+
89
+    it "does not create event when event is a duplicate" do
90
+      @event.payload[:output] = "2014-07-01"
91
+      @checker.receive([@event])
92
+
93
+      expect {
94
+        @checker.receive([@event])
95
+      }.to change(Event, :count).by(0)
96
+    end
97
+
98
+    it "should respect the lookback value" do
99
+      3.times do |i|
100
+        @event.payload[:output] = "2014-07-0#{i}"
101
+        @checker.receive([@event])
102
+      end
103
+      @event.payload[:output] = "2014-07-05"
104
+      expect {
105
+        @checker.receive([@event])
106
+      }.to change(Event, :count).by(1)
107
+      expect(@checker.memory['properties'].length).to eq(3)
108
+      expect(@checker.memory['properties']).to eq(["2014-07-01", "2014-07-02", "2014-07-05"])
109
+    end
110
+
111
+    it "should hash the value if its longer then 10 chars" do
112
+      @event.payload[:output] = "01234567890"
113
+      expect {
114
+        @checker.receive([@event])
115
+      }.to change(Event, :count).by(1)
116
+      expect(@checker.memory['properties'].last).to eq('2256157795')
117
+    end
118
+
119
+    it "should use the whole event if :property is blank" do
120
+      @checker.options['property'] = ''
121
+      expect {
122
+        @checker.receive([@event])
123
+      }.to change(Event, :count).by(1)
124
+      expect(@checker.memory['properties'].last).to eq('3023526198')
125
+    end
126
+  end
127
+end